package com.xunmall.mail.listener;

import com.xunmall.base.dto.KafkaMessage;
import com.xunmall.base.dto.Result;
import com.xunmall.email.dto.Email;
import com.xunmall.email.service.EmailValidate;
import com.xunmall.mail.component.mail.service.MailerTemplate;
import com.xunmall.mail.component.operation.service.OperationRecordService;
import com.xunmall.mail.constan.enums.SendStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.stream.Stream;


@Component
@Slf4j
public class MailListener {

    @Autowired
    private MailerTemplate template;

    @Autowired
    private OperationRecordService recordService;

    public void receive(KafkaMessage message) {
        try {
            Email email = (Email) message.getContent();
            EmailValidate.Validate(email);
            Stream.of(email.getAddressees()).forEach(item -> {
                log.info("prepare for send mail to " + item);
            });
            boolean flag = template.send(email);
            log.info("send mail result:{}", flag);
            log.info("prepare persistent operation record.");
            Result<String> result = recordService.addOperationRecord(email, flag ? SendStatus.finish : SendStatus.exception);
            if (result.getSuccess()) {
                log.error("persistent operation record fail. error:{}", result.getMessage());
            } else {
                log.info("persistent operation record success.");
            }
        } catch (Exception e) {
            log.error("send mail error : " + e.getMessage(), e);
        }
    }

    @KafkaListener(topics = "${xunmall.kafka.email.topic:xunmall-mail}")
    public void listenerMessage(List<KafkaMessage> records, Acknowledgment ack) {
        log.info("records.size: " + records.size() + " in all");
        for (KafkaMessage kafkaMessage : records) {
            receive(kafkaMessage);
        }
        log.info("start commit offset");
        ack.acknowledge();//手动提交偏移量
        log.info("stop commit offset");
    }

}
